Python作业开发实践

Lindorm计算引擎通过HTTP RESTful API的方式提供Spark Python作业提交入口,您可以按照这种方式运行流批任务、机器学习和图计算任务。本文介绍Lindorm计算引擎Python作业开发的详细步骤。

前提条件

已开通Lindorm计算引擎,具体操作请参见开通与变配

Spark Python作业开发流程

  1. 准备Spark Python作业

  2. 打包Spark Python作业

  3. 上传Spark Python作业

  4. 提交Spark Python作业

步骤一:准备Spark Python作业

  1. 下载Spark Python作业示例压缩包,下载链接为Spark Python作业示例

  2. 解压Spark Python作业示例压缩包,解压后的目录名称为lindorm-spark-examples。打开lindorm-spark-examples/python目录,参考python目录结构。

  3. 项目开发的根目录以your_project为例,介绍项目的目录结构。

    1. your_project目录下新建__init__.py文件,内容为空。

    2. 改造入口文件。

      1. 在入口文件中编写代码,将your_project添加到sys.path中,代码详情请参见Spark Python作业示例中的lindorm-spark-examples/python/your_project/main.py文件的Notice1部分。

        # Notice1: You need to do the following step to complete the code modification:
        # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py
        # Step2: Please add current dir to sys.path, you should add the following code to your main file
        current_dir = os.path.abspath(os.path.dirname(__file__))
        sys.path.append(current_dir)
        print("current dir in your_project: %s" % current_dir)
        print("sys.path: %s \n" % str(sys.path))
      2. 在入口文件中将入口逻辑封装到main(argv)方法中。代码详情请参见Spark Python作业示例中的lindorm-spark-examples/python/your_project/main.py文件的Notice2部分。

        # Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function,
        # so that launcher.py in parent directory just call main(sys.argv)
        def main(argv):
            print("Receive arguments: %s \n" % str(argv))
        
            print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__)))
            # Write your code here
        
        
        if __name__ == "__main__":
            main(sys.argv)
    3. 创建Spark Python作业启动的入口文件,用来调用main(argv)方法。在根目录your_project创建同级目录launcher.py,可以复制Spark Python作业示例中的lindorm-spark-examples/python/launcher.py文件。

步骤二:打包Spark Python作业

  1. 打包项目依赖的Python环境和第三方类库。推荐使用Conda或者Virtualenv将依赖类库打包为tar包,具体操作请参见Python Package Management

    重要
    • 使用Conda或者Virtualenv打的tar包通过spark.archives传递,可以是spark.archives支持的所有格式。详细说明,请参见spark.archives

    • 请在Linux环境下完成该操作,以保证Lindorm计算引擎能正常识Python二进制文件。

  2. 打包项目文件。将your_project文件打包为.zip或者.egg格式文件。

    • 执行以下命令将项目文件打包为.zip格式文件。

      zip -r project.zip your_project
    • 将项目文件打包为.egg格式文件,具体操作请参见Building Eggs

步骤三:上传Spark Python作业

将以下文件都上传至OSS,具体操作请参见上传文件

  • 步骤二中打包的Python环境和第三方类库(也就是tar包)。

  • 步骤二中打包的项目文件(也就是.zip或者.egg文件)。

  • 步骤一中的launcher.py文件。

步骤四:提交Spark Python作业

Lindorm计算引擎支持以下两种方式提交并管理作业。

请求参数包括以下两个部分:

  • Python作业环境参数说明,示例如下:

    {"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
    • 提交Python作业项目文件(也就是.zip.egg或者.py格式的文件)时,请配置configs参数中的spark.submit.pyFiles

    • 提交Python环境和第三方类库(也就是tar包)时,请配置configs参数中的spark.archivesspark.kubernetes.driverEnv.PYSPARK_PYTHON

      • 配置spark.archives参数时使用井号(#)指定targetDir

      • 配置spark.kubernetes.driverEnv.PYSPARK_PYTHON参数指定Python文件路径。

  • 如果将文件上传至OSS,需要在configs参数中配置以下信息。

    表 1. 配置configs相关参数

    参数

    示例值

    说明

    spark.hadoop.fs.oss.endpoint

    oss-cn-beijing-internal.aliyuncs.com

    存储Python文件的OSS地址。

    spark.hadoop.fs.oss.accessKeyId

    testAccessKey ID

    通过阿里云控制台创建的Access Key ID和Access Key Secret,获取方法请参见创建AccessKey

    spark.hadoop.fs.oss.accessKeySecret

    testAccessKey Secret

    spark.hadoop.fs.oss.impl

    固定值:org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem

    访问OSS的类。

    说明

    更多参数请参见参数说明

Python作业开发示例

  1. 下载并解压Spark Python作业示例

  2. 改造入口文件,打开Python目录下的your_project/main.py文件并配置相关代码。

    1. your_project目录添加到sys.path中。

      current_dir = os.path.abspath(os.path.dirname(__file__))
      sys.path.append(current_dir)
      print("current dir in your_project: %s" % current_dir)
      print("sys.path: %s \n" % str(sys.path))
    2. main.py文件中添加入口逻辑,如下示例初始化SparkSession。

      from pyspark.sql import SparkSession
      spark = SparkSession \
          .builder \
          .appName("PythonImportTest") \
          .getOrCreate()
      print(spark.conf)
      spark.stop()
  3. 在Python目录下打包your_project文件。

    zip -r your_project.zip your_project
  4. 在Linux环境下,使用Conda打包Python运行环境。

    conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz
  5. 将打包好的your_project.zippyspark_conda_env.tar.gz上传至OSS,并将Python目录下的launcher.py文件上传至OSS。

  6. 通过以下两种方式提交作业。

作业诊断

Python作业提交成功后,可以在作业列表页面查看作业运行状况和SparkUI访问地址,具体操作请参见查看作业。作业提交过程中如果有其他问题,请提交工单并将JobID和WebUI地址提供给工单处理人员。